SparkSession is used to connect to the Spark Cluster.
from pyspark.sql import SparkSession
We will use Pandas to operate on the reduced data in the driver program.
import pandas as pd
Numpy will be always useful.
import numpy as np
Create a new session (or reuse an existing one).
spark = SparkSession.builder.getOrCreate()
spark
We can see that the session is established.
We can list the tables in our Spark Session, currently empty.
print(spark.catalog.listTables())
We can create a Pandas DataFrame with random values.
pd_temp = pd.DataFrame(np.random.random(100))
We can see on the plot that it is really random:
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')
pd_temp.plot()
Now we can convert it into Spark DataFrame:
spark_temp = spark.createDataFrame(pd_temp)
createOrReplaceTempView creates (or replaces if that view name already exists) a lazily evaluated "view" that you can then use like a table in Spark SQL.
It does not persist to memory unless you cache (persist) the dataset that underpins the view.
spark_temp.createOrReplaceTempView("temp")
The created view is TEMPORARY which means it is not persistent.
print(spark.catalog.listTables())
spark_temp.show()
We can now use transformations on this DataFrame. The transformations are translated (compiled) to RDD transformations.
from pyspark.sql.functions import col, asc
spark_temp.filter((col('0') > 0.9)).show()
file_path = "airports.csv"
# Read in the airports data
airports = spark.read.csv(file_path,header=True)
# Show the data
print(airports.show())
It may be useful to convert them to Pandas for quick browsing.
Warning! This is not efficient for large datasets, as it requires performing actions on the dataset.
airports.toPandas()
airports.createOrReplaceTempView("airports")
# Get the first 10 rows of flights
query = "FROM airports SELECT * LIMIT 10"
airports10 = spark.sql(query)
# Show the results
airports10.show()
Read data from CSV file:
inferSchema - to detect which columns are numbers (not strigs!) - useful e.g. for sorting.header - to read the firs line as column namescountries = spark.read.csv("countries of the world.csv",inferSchema=True,header=True)
countries.toPandas()
We can inspect the schema of the DataFrame.
countries.printSchema()
countries.createOrReplaceTempView("countries")
spark.sql("SELECT * FROM countries WHERE Region LIKE '%OCEANIA%'").toPandas()
DSL = Domain Specific Language - API similar to natural or other language, implemented as library in another language.
List all the countries with the population > 38 million
countries.filter((col("Population") > 38000000)).orderBy("Population").toPandas()
Select all the countries from Europe
countries.select("Country", "Population").where(col("Region").like("%EUROPE%")).show()
Conditions in where clause can contain logical expressions.
countries.select("Country", "Population")\
.where((col("Region").like("%EUROPE%")) & (col("Population")> 10000000)).show()
We can run aggregations with predefined functions (faster!):
from pyspark.sql.functions import sum
pd_countries = countries.select("Region", "Population").groupBy("Region").agg(sum("Population")).toPandas()
pd_countries
We can make the column name look better, by using alias:
pd_countries = countries.select("Region", "Population").groupBy("Region").agg(sum("Population").alias('Total')).toPandas()
pd_countries
Pandas DataFrames are useful for plotting using MatPlotLib:
pd_countries.plot(x='Region', y='Total',kind='bar', figsize=(10, 6))
Our countries DataFrame has some problems:
We can clean the data using User Defined Functions (UDF)
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
Define a Python function which coverts numbers with commas to float
def to_float (s) :
return float(s.replace(',','.'))
Test that it works:
to_float('0,99')
Now define a Spark UDF:
float_udf = udf(to_float , FloatType())
Test it on a Data Frame
countries.withColumn("Literacy", float_udf("Literacy (%)"))
OK, we can see that the Literacy is now float
countries.show(50)
# countries.where((col("Literacy") < 50) & (col("GDP ($ per capita)") > 700)).show()
Oops, what does it mean???
Before we can use the table, we need to remove empty rows. Otherwise our UDF will fail.
full_countries = countries.select('Country', 'Population', 'Literacy (%)', 'GDP ($ per capita)').na.drop()
We can now apply the new UDF to the Data Frame:
full_countries = full_countries.withColumn("Literacy", float_udf("Literacy (%)"))
full_countries.show(50)
full_countries.where((col("Literacy") < 50) & (col("GDP ($ per capita)") > 700)).show()
full_countries.toPandas().plot(x="Literacy",y="GDP ($ per capita)",kind="scatter",figsize=(10, 6))
Narysuj wykres zależności kolumn Birthrate i Deathrate w zależności od GDP per capita dla 30 najbogatszych (zależnośc Birthrate od GDP oraz Deathrate od GDP na jednym wykresie róznymi kolorami) i 30 najbiedniejszych krajów (podobnie)
countries.toPandas()
countries_plot = countries.select('Country', 'GDP ($ per capita)', 'Birthrate', 'Deathrate').na.drop()
top_30 = countries_plot.withColumn("Birthrate", float_udf("Birthrate"))\
.withColumn("Deathrate", float_udf("Deathrate"))\
.withColumn("GDP ($ per capita)", col("GDP ($ per capita)").cast("float"))\
.orderBy("GDP ($ per capita)", ascending=False)\
.limit(30)
top_30_df = top_30.toPandas()
top_30_df.head(5)
plt.figure(figsize=(10, 6))
plt.scatter(top_30_df['Birthrate'], top_30_df['GDP ($ per capita)'], label='Birthrate')
plt.scatter(top_30_df['Deathrate'], top_30_df['GDP ($ per capita)'], label='Deathrate')
plt.ylabel('GDP ($ per capita)')
plt.legend()
plt.show()
The richest countries often are characterised by similar level of Birthrate and Deathrate.
bot_30 = countries_plot.withColumn("Birthrate", float_udf("Birthrate"))\
.withColumn("Deathrate", float_udf("Deathrate"))\
.withColumn("GDP ($ per capita)", col("GDP ($ per capita)").cast("float"))\
.orderBy("GDP ($ per capita)", ascending=True)\
.limit(30)
bot_30_df = bot_30.toPandas()
plt.figure(figsize=(10, 6))
plt.scatter(bot_30_df['Birthrate'], bot_30_df['GDP ($ per capita)'], label='Birthrate')
plt.scatter(bot_30_df['Deathrate'], bot_30_df['GDP ($ per capita)'], label='Deathrate')
plt.ylabel('GDP ($ per capita)')
plt.legend()
plt.show()
The poorest countries are characterised by greater Birthrate than Deathrate.
Wczytaj zbiór danych "airports.csv" ze strony https://www.kaggle.com/jonatancr/airports (zawiera on dane na temat lotnisk z całego świata). Ponieważ w zbiorze tym nie ma nagłówka, nadaj własne nazwy kolumnom, korzystając z opisu pliku na tej stronie. Narysuj na wykresie (mapie) położenia wszystkich/wybranych lotnisk.
airports = spark.read.csv("airports.csv",header=False)
airports.toPandas().head(2)
airports = airports.toDF("AirportID", "Name", "City", "Country", "IATA", "ICAO", "Latitude",
"Longitude", "Altitude", "Timezone", "DST", " Timezone", "Type", "Source")
airports.toPandas().head(2)
air_points_pd = airports.select("Name", "Latitude", "Longitude").toPandas()
air_points_pd.head(3)
import folium
m = folium.Map()
for p in range(0, len(air_points_pd)):
folium.Marker((air_points_pd.iloc[p, 1], air_points_pd.iloc[p, 2]), popup=air_points_pd['Name'][p]).add_to(m)
m
Znajdź 10 krajów w których najniżej położone lotnisko ma najwyższą wysokość, podając wyniki w m. n. p. m.
airports.toPandas().head(2)
airports = airports.withColumn("Altitude", col("Altitude").cast(FloatType()))
from pyspark.sql import functions as F
low = airports.groupBy("Country")\
.agg(F.first("Name").alias("Name"), F.min("Altitude").alias("Altitude"))
low.toPandas()
top = low.orderBy("Altitude", ascending=False)
top = top.withColumn("Altitude [m]", col("Altitude") * 0.3048)\
.limit(10)
top.toPandas()
Na wykresie scatterplot narysuj zależność liczby lotnisk od powierzchni kraju.
sorted(airports.toPandas()['Country'].unique())
airports = airports.orderBy('Country')
airports.toPandas().head(2)
sorted(countries.toPandas()['Country'].unique())
countries = countries.orderBy('Country')
countries.toPandas().head(2)
from pyspark.sql.functions import upper, count, isnull, trim
airports = airports.withColumn('Country', trim(airports['Country']))
countries = countries.withColumn('Country', trim(countries['Country']))
merged = airports.alias('a').join(countries.alias('c'), on=airports['Country'] == countries['Country'], how='left')
result = merged.groupBy('a.Country', 'c.`Area (sq. mi.)`').agg(count('a.AirportID').alias('Amount'))
result_sorted = result.orderBy(col('Amount').desc())
result_sorted.show()
Some of them are just part (islands) of another country like Anguilla (UK). Many of them had whitespaces, so I've removed them.
merged = airports.alias('a').join(
countries.alias('c'),
on=col('a.Country') == col('c.Country'),
how='full_outer'
)
diff_countries = merged.filter(isnull(col('a.Country')) | isnull(col('c.Country')))
diff_countries.select('a.Country', 'c.Country').distinct().show(100)
result_sorted = result_sorted.toPandas()
plt.figure(figsize=(10, 6))
plt.scatter(result_sorted['Area (sq. mi.)'], result_sorted['Amount'], c='blue')
plt.xlabel('Country Area (sq. mi.)')
plt.ylabel('Amount of airports')
plt.title('Relationship between country area and airports number')
plt.show()
Oblicz, ile lotnisk jest na poszczególnych kontynentach. To zadanie również wymaga złączenia ramek "Countries" i "Airports". Wynik przedstaw na wykresie kołowym.
merged = airports.alias('a').join(countries.alias('c'), on=airports['Country'] == countries['Country'], how='left')
result = merged.groupBy('c.Region').agg(count('a.AirportID').alias('Amount'))
result_sorted = result.orderBy(col('Amount').desc())
result_sorted.show()
result_sorted = result_sorted.toPandas()
plt.figure(figsize=(10, 10))
plt.pie(result_sorted['Amount'], labels = result_sorted['Region'], autopct='%1.1f%%', textprops={'fontsize': 14})
plt.title('Airports distribution by region', fontweight='bold', size=20)
plt.show()